package com.shujia.flink.sql

import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}

object Demo8JDbcSource {
  def main(args: Array[String]): Unit = {
    val settings: EnvironmentSettings = EnvironmentSettings
      .newInstance()
      .inStreamingMode()
      //.inBatchMode()
      .build()
    /**
     * flink sql环境
     *
     */
    val table: TableEnvironment = TableEnvironment.create(settings)

    table.executeSql(
      """
        |
        |CREATE TABLE student_mysql (
        |  id BIGINT,
        |  name STRING,
        |  age BIGINT,
        |  gender STRING,
        |  clazz STRING
        |) WITH (
        |   'connector' = 'jdbc',
        |   'url' = 'jdbc:mysql://master:3306/bigdata',
        |   'table-name' = 'students',
        |   'username' = 'root',
        |   'password' = '123456'
        |)
        |
        |""".stripMargin)

    table.executeSql(
      """
        |
        |CREATE TABLE print_table
        |WITH ('connector' = 'print')
        |LIKE student_mysql (EXCLUDING ALL)
        |""".stripMargin)

    table.executeSql(
      """
        |
        |insert into print_table
        |select * from student_mysql
        |
        |""".stripMargin)
  }

}
